RocketMQ msgId与offsetMsgId释疑(实战篇)
点击上方“中间件兴趣圈”,选择“设为星标”
做积极的人,越努力越幸运!
本文将详细介绍消息发送、消息消费、RocketMQ queryMsgById 命令以及 rocketmq-console 等使用场景中究竟是用的哪一个ID。
1、抛出问题
1.1 从消息发送看消息ID
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("TestTopic" /* Topic */,null /* Tag */, ("Hello RocketMQ test1" ).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
执行效果如图所示:
1.2 从消息消费看消息ID
package org.apache.rocketmq.example.quickstart;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TestTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println("MessageExt msg.getMsgId():" + msgs.get(0).getMsgId());
System.out.println("-------------------分割线-----------------");
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
执行效果如图所示:
2、消息ID释疑
从消息发送的结果可以得知,RocketMQ 发送的返回结果会返回 msgId 与 offsetMsgId,那这两个 msgId 分别是代表什么呢?
msgId:该 ID 是消息发送者在消息发送时会首先在客户端生成,全局唯一,在 RocketMQ 中该 ID 还有另外的一个叫法:uniqId,无不体现其全局唯一性。
offsetMsgId:消息偏移ID,该 ID 记录了消息所在集群的物理地址,主要包含所存储 Broker 服务器的地址( IP 与端口号)以及所在commitlog 文件的物理偏移量。
2.1 msgId 即全局唯一 ID 构建规则
从这张图可以看出,msgId确实是客户端生成的,接下来我们详细分析一下其生成算法。MessageClientIDSetter#createUniqID
public static String createUniqID() {
StringBuilder sb = new StringBuilder(LEN * 2);
sb.append(FIX_STRING); // @1
sb.append(UtilAll.bytes2string(createUniqIDBuffer())); // @2
return sb.toString();
}
一个 uniqID 的构建主要分成两个部分:FIX_STRING 与唯一 ID 生成算法,顾名思义,FIX_STRING 就是一个客户端固定一个前缀,那接下来先看一下固定字符串的生成规则。
2.1.1 FIX_STRING
MessageClientIDSetter静态代码块
static {
byte[] ip;
try {
ip = UtilAll.getIP();
} catch (Exception e) {
ip = createFakeIP();
}
LEN = ip.length + 2 + 4 + 4 + 2;
ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
tempBuffer.position(0);
tempBuffer.put(ip);
tempBuffer.position(ip.length);
tempBuffer.putInt(UtilAll.getPid());
tempBuffer.position(ip.length + 2);
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
}
从这里可以看出 FIX_STRING 的主要由:客户端的IP、进程ID、加载 MessageClientIDSetter 的类加载器的 hashcode。
2.1.2 唯一性算法
msgId 的唯一性算法由 MessageClientIDSetter 的createUniqIDBuffer 方法实现。
private static byte[] createUniqIDBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}
buffer.position(0);
buffer.putInt((int) (System.currentTimeMillis() - startTime));
buffer.putShort((short) COUNTER.getAndIncrement());
return buffer.array();
}
可以得出 msgId 的后半段主要由:当前时间与系统启动时间的差值,以及自增序号。
2.2 offsetMsgId构建规则
在消息 Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的文件,然后会再次生成一个id,代码中虽然也叫 msgId,其实这里就是我们常说的 offsetMsgId,即记录了消息的物理偏移量,故我们重点来看一下其具体生成规则:MessageDecoder#createMessageId
public static String createMessageId(final ByteBuffer input ,
final ByteBuffer addr, final long offset) {
input.flip();
int msgIDLength = addr.limit() == 8 ? 16 : 28;
input.limit(msgIDLength);
input.put(addr);
input.putLong(offset);
return UtilAll.bytes2string(input.array());
}
首先结合该方法的调用上下文,先解释一下该方法三个入参的含义:
ByteBuffer input
用来存放 offsetMsgId 的字节缓存区( NIO 相关的基础知识)ByteBuffer addr
当前 Broker 服务器的 IP 地址与端口号,即通过解析 offsetMsgId 从而得到消息服务器的地址信息。long offset
消息的物理偏移量。
即构成 offsetMsgId 的组成部分:Broker 服务器的 IP 与端口号、消息的物理偏移量。
温馨提示:即在 RocketMQ 中,只需要提供 offsetMsgId,可以不必知道该消息所属的 topic 信息即可查询该条消息的内容。
2.3 消息发送与消息消费返回的消息ID信息
消息发送时会在 SendSesult 中返回 msgId、offsetMsgId,在了解了这个两个 ID 的含义时则问题不大,接下来重点介绍一下消息消费时返回的 msgId 到底是哪一个。
在消息消费时,我们更加希望因为 msgId (即客户端生成的全局唯一性ID),因为该全局性 ID 非常方便实现消费端的幂等。
在本文的1.2节我们也提到一个现象,为什么如下图代码中输出的 msgId 会不一样呢?
那我们接下来分别看一下其 getMsgId() 方法与 toString 方法即可。
@Override
public String getMsgId() {
String uniqID = MessageClientIDSetter.getUniqID(this);
if (uniqID == null) {
return this.getOffsetMsgId();
} else {
return uniqID;
}
}
原来在调用 MessageClientExt 中的 getMsgId 方法时,如果消息的属性中存在其唯一ID,则返回消息的全局唯一ID,否则返回消息的 offsetMsgId。
而 MessageClientExt 方法并没有重写 MessageExt 的 toString 方法,其实现如图所示:
温馨提示:如果消息消费失败需要重试,RocketMQ 的做法是将消息重新发送到 Broker 服务器,此时全局 msgId 是不会发送变化的,但该消息的 offsetMsgId 会发送变化,因为其存储在服务器中的位置发生了变化。
3、实践经验
在解答了消息发送与消息消费关于msgId与offsetMsgId的困扰后,再来介绍一下如果根据 msgId 去查询消息。
想必大家对 rocketmq-console ,那在消息查找界面,展示的消息列表中返回的 msgId 又是哪一个呢?
其实 RokcetMQ 也提供了 queryMsgById 命令来查看消息的内容,不过这里的 msgId 是 offsetMsgId,我们首先将全局唯一ID传入命令,其执行效果如下:
欢迎加入我的知识星球,一起交流源码,探讨架构,打造高质量的技术交流圈,长按如下二维码
中间件兴趣圈 知识星球 正在对如下话题展开如火如荼的讨论:
1、【让天下没有难学的Netty-网络通道篇】
2、Java 并发框架(JUC) 探讨【面试神器】
3、源码分析Alibaba Sentienl 专栏背后的写作与学习技巧。